Source code for nlp_architect.models.absa.inference.inference

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import math
from os import PathLike
from pathlib import Path
from typing import Union

from nlp_architect.common.core_nlp_doc import CoreNLPDoc
from nlp_architect.models.absa import INFERENCE_OUT
from nlp_architect.models.absa.inference.data_types import Term, TermType, Polarity, SentimentDoc,\
    SentimentSentence, LexiconElement
from nlp_architect.models.absa.utils import _read_lexicon_from_csv, load_opinion_lex, \
    _load_aspect_lexicon

INTENSIFIER_FACTOR = 0.3
VERB_POS = {"VB", "VBD", "VBG", "VBN", "VBP", "VBZ"}


[docs]class SentimentInference(object): """Main class for sentiment inference execution. Attributes: opinion_lex: Opinion lexicon as outputted by TrainSentiment module. aspect_lex: Aspect lexicon as outputted by TrainSentiment module. intensifier_lex (dict): Pre-defined intensifier lexicon. negation_lex (dict): Pre-defined negation lexicon. """ def __init__(self, aspect_lex: Union[str, PathLike], opinion_lex: Union[str, PathLike, dict], parse: bool = True): """Inits SentimentInference with given aspect and opinion lexicons.""" INFERENCE_OUT.mkdir(parents=True, exist_ok=True) self.opinion_lex = \ opinion_lex if type(opinion_lex) is dict else load_opinion_lex(Path(opinion_lex)) self.aspect_lex = _load_aspect_lexicon(Path(aspect_lex)) self.intensifier_lex = _read_lexicon_from_csv('IntensifiersLex.csv') self.negation_lex = _read_lexicon_from_csv('NegationSentLex.csv') if parse: from nlp_architect.pipelines.spacy_bist import SpacyBISTParser self.parser = SpacyBISTParser(spacy_model='en') else: self.parser = None
[docs] def run(self, doc: str = None, parsed_doc: CoreNLPDoc = None) -> SentimentDoc: """Run SentimentInference on a single document. Returns: The sentiment annotated document, which contains the detected events per sentence. """ if not parsed_doc: if not self.parser: raise RuntimeError("Parser not initialized (try parse=True at init )") parsed_doc = self.parser.parse(doc) sentiment_doc = None for sentence in parsed_doc.sentences: events = [] scores = [] for aspect_row in self.aspect_lex: _, asp_events = self._extract_event(aspect_row, sentence) for asp_event in asp_events: events.append(asp_event) scores += [term.score for term in asp_event if term.type == TermType.ASPECT] if events: if not sentiment_doc: sentiment_doc = SentimentDoc(parsed_doc.doc_text) sentiment_doc.sentences.append( SentimentSentence(sentence[0]['start'], sentence[-1]['start'] + sentence[-1]['len'] - 1, events)) return sentiment_doc
def _extract_intensifier_terms(self, toks, sentiment_index, polarity, sentence): """Extract intensifier events from sentence.""" count = 0 terms = [] for intens_i, intens in [(i, x) for i, x in enumerate(toks) if x in self.intensifier_lex]: if math.fabs(sentiment_index - intens_i) == 1: score = self.intensifier_lex[intens].score terms.append(Term(intens, TermType.INTENSIFIER, polarity, score, sentence[intens_i]['start'], sentence[intens_i]['len'])) count += abs(score + float(INTENSIFIER_FACTOR)) return count if count != 0 else 1, terms def _extract_neg_terms(self, toks: list, op_i: int, sentence: list) -> tuple: """Extract negation terms from sentence. Args: toks: Sentence text broken down to tokens (words). op_i: Index of opinion term in sentence. sentence: parsed sentence Returns: List of negation terms and its aggregated sign (positive or negative). """ sign = 1 terms = [] gov_op_i = sentence[op_i]['gov'] dep_op_indices = [sentence.index(x) for x in sentence if x['gov'] == op_i] for neg_i, negation in [(i, x) for i, x in enumerate(toks) if x in self.negation_lex]: position = self.negation_lex[negation].position dist = op_i - neg_i before = position == 'before' and (dist == 1 or neg_i in dep_op_indices) after = position == 'after' and (dist == -1 or neg_i == gov_op_i) both = position == 'both' and dist in (1, -1) if before or after or both: terms.append(Term(negation, TermType.NEGATION, Polarity.NEG, self.negation_lex[negation].score, sentence[toks.index(negation)]['start'], sentence[toks.index(negation)]['len'])) sign *= self.negation_lex[negation].score return terms, sign def _extract_event(self, aspect_row: LexiconElement, parsed_sentence: list) -> tuple: """Extract opinion and aspect terms from sentence.""" event = [] sent_aspect_pair = None real_aspect_indices = _consolidate_aspects(aspect_row.term, parsed_sentence) aspect_key = aspect_row.term[0] for aspect_index_range in real_aspect_indices: for word_index in aspect_index_range: sent_aspect_pair, event = \ self._detect_opinion_aspect_events(word_index, parsed_sentence, aspect_key, aspect_index_range) if sent_aspect_pair: break return sent_aspect_pair, event @staticmethod def _modify_for_multiple_word(cur_tkn, parsed_sentence, index_range): """Modify multiple-word aspect tkn length and start index. Args: index_range: The index range of the multi-word aspect. Returns: The modified aspect token. """ if len(index_range) >= 2: cur_tkn["start"] = parsed_sentence[index_range[0]]["start"] cur_tkn["len"] = len(parsed_sentence[index_range[0]]["text"]) for i in index_range[1:]: cur_tkn["len"] = int(cur_tkn["len"]) + len( parsed_sentence[i]["text"]) + 1 return cur_tkn def _detect_opinion_aspect_events(self, aspect_index, parsed_sent, aspect_key, index_range): """Extract opinion-aspect events from sentence. Args: aspect_index: index of aspect in sentence. parsed_sent: current sentence parse tree. aspect_key: main aspect term serves as key in aspect dict. index_range: The index range of the multi word aspect. Returns: List of aspect sentiment pair, and list of events extracted. """ all_pairs, events = [], [] sentence_text_list = [x["text"] for x in parsed_sent] sentence_text = ' '.join(sentence_text_list) for tok_i, tok in enumerate(parsed_sent): aspect_op_pair = [] terms = [] gov_i = tok['gov'] gov = parsed_sent[gov_i] gov_text = gov['text'] tok_text = tok['text'] # 1st order rules # Is cur_tkn an aspect and gov an opinion? if tok_i == aspect_index: if gov_text.lower() in self.opinion_lex: aspect_op_pair.append( (self._modify_for_multiple_word(tok, parsed_sent, index_range), gov)) # Is gov an aspect and cur_tkn an opinion? if gov_i == aspect_index and tok_text.lower() in self.opinion_lex: aspect_op_pair.append( (self._modify_for_multiple_word(gov, parsed_sent, index_range), tok)) # If not found, try 2nd order rules if not aspect_op_pair and tok_i == aspect_index: # 2nd order rule #1 for op_t in parsed_sent: if op_t['gov'] == gov_i and op_t['text'].lower() in self.opinion_lex: aspect_op_pair.append( (self._modify_for_multiple_word(tok, parsed_sent, index_range), op_t)) # 2nd order rule #2 gov_gov = parsed_sent[parsed_sent[gov_i]['gov']] if gov_gov['text'].lower() in self.opinion_lex: aspect_op_pair.append( (self._modify_for_multiple_word(tok, parsed_sent, index_range), gov_gov)) # if aspect_tok found for aspect, opinion in aspect_op_pair: op_tok_i = parsed_sent.index(opinion) score = self.opinion_lex[opinion['text'].lower()].score neg_terms, sign = self._extract_neg_terms(sentence_text_list, op_tok_i, parsed_sent) polarity = Polarity.POS if score * sign > 0 else Polarity.NEG intensifier_score, intensifier_terms = self._extract_intensifier_terms( sentence_text_list, op_tok_i, polarity, parsed_sent) over_all_score = score * sign * intensifier_score terms.append(Term(aspect_key, TermType.ASPECT, polarity, over_all_score, aspect['start'], aspect['len'])) terms.append(Term(opinion['text'], TermType.OPINION, polarity, over_all_score, opinion['start'], opinion['len'])) if len(neg_terms) > 0: terms = terms + neg_terms if len(intensifier_terms) > 0: terms = terms + intensifier_terms all_pairs.append([aspect_key, opinion['text'], over_all_score, polarity, sentence_text]) events.append(terms) return all_pairs, events
def _sentence_contains_after(sentence, index, phrase): """Returns sentence contains phrase after given index.""" for i in range(len(phrase)): if len(sentence) <= index + i or phrase[i].lower() not in \ {sentence[index + i][field].lower() for field in ('text', 'lemma')}: return False return True def _consolidate_aspects(aspect_row, sentence): """Returns consolidated indices of aspect terms in sentence. Args: aspect_row: List of aspect terms which belong to the same aspect-group. """ indices = [] aspect_phrases: list = \ sorted([phrase.split(' ') for phrase in aspect_row], key=len, reverse=True) appeared = set() for tok_i in range(len(sentence)): for aspect_phrase in aspect_phrases: if _sentence_contains_after(sentence, tok_i, aspect_phrase): span = range(tok_i, tok_i + len(aspect_phrase)) if not appeared & set(span): appeared |= set(span) indices.append(list(span)) return indices